package queue

import (
	"errors"
	"gitee.com/Rekeeper/orange/cfg"
	"gitee.com/Rekeeper/orange/utils"
	"sync"
	"time"
)

type MqProducer interface {
	SendMsg(topic string, body string) (mqMsg MqMsg, err error)
	SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
}

type MqConsumer interface {
	ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)
}

const (
	_ = iota
	SendMsg
	ReceiveMsg
)

type MqMsg struct {
	RunType   int       `json:"run_type"`
	Topic     string    `json:"topic"`
	MsgId     string    `json:"msg_id"`
	Offset    int64     `json:"offset"`
	Partition int32     `json:"partition"`
	Timestamp time.Time `json:"timestamp"`

	Body []byte `json:"body"`
}

var mqProducerInstanceMap map[string]MqProducer
var mqConsumerInstanceMap map[string]MqConsumer
var mutex sync.Mutex

func init() {
	mqProducerInstanceMap = make(map[string]MqProducer)
	mqConsumerInstanceMap = make(map[string]MqConsumer)
}

// NewProducer 新建一个生产者实例
func NewProducer(groupName string) (mqClient MqProducer, err error) {
	if item, ok := mqProducerInstanceMap[groupName]; ok {
		return item, nil
	}
	driver := cfg.GetString("queue.driver", "")
	endpoints := cfg.GetSliceString("queue.endpoints", []string{})
	if len(endpoints) == 0 {
		return mqClient, errors.New("endpoints is not found.")
	}
	retry := cfg.GetInt("queue.retry", 2)

	if groupName == "" {
		return mqClient, errors.New("mq groupName is empty.")
	}

	switch driver {
	case "rocketmq":
		mqClient = RegisterRocketProducerMust(endpoints, groupName, retry)
	case "kafka":
		version := cfg.GetString("queue.version", "2.0.0")
		mqClient = RegisterKafkaProducerMust(KafkaConfig{
			Brokers: endpoints,
			GroupID: groupName,
			Version: version,
		})
	case "redis":
		passwd := cfg.GetString("queue.passwd", "")
		dbnum := cfg.GetInt("queue.redisdb", 0)
		timeout := cfg.GetInt("queue.timeout", 3600)
		mqClient = RegisterRedisMqProducerMust(RedisOption{
			Addr:    endpoints[0],
			Passwd:  passwd,
			DBnum:   dbnum,
			Timeout: timeout,
		}, PoolOption{
			5, 50, 5,
		}, groupName, retry)

	default:
		panic("queue driver is not support")
	}

	mutex.Lock()
	defer mutex.Unlock()
	mqProducerInstanceMap[groupName] = mqClient

	return mqClient, nil
}

// NewConsumer 新建一个消费者实例
func NewConsumer(groupName string) (mqClient MqConsumer, err error) {
	// 是否支持创建多个消费者
	multiComsumer := cfg.GetBool("queue.multiComsumer", true)
	randTag := string(utils.RandomCreateBytes(6))
	if multiComsumer == false {
		randTag = "001"
	}

	if item, ok := mqConsumerInstanceMap[groupName+"-"+randTag]; ok {
		return item, nil
	}

	endpoints := cfg.GetSliceString("queue.endpoints", []string{})
	driver := cfg.GetString("queue.driver", "")
	if len(endpoints) == 0 {
		return mqClient, errors.New("endpoints is not found.")
	}

	if groupName == "" {
		return mqClient, errors.New("mq groupName is empty.")
	}

	switch driver {
	case "rocketmq":
		mqClient = RegisterRocketConsumerMust(endpoints, groupName)
	case "kafka":
		version := cfg.GetString("queue.version", "2.0.0")
		clientId := "ORNAGE-Consumer-" + groupName
		if cfg.GetBool("queue.randClient", true) {
			clientId += "-" + randTag
		}

		mqClient = RegisterKafkaMqConsumerMust(KafkaConfig{
			Brokers:  endpoints,
			GroupID:  groupName,
			Version:  version,
			ClientId: clientId,
		})
	case "redis":
		passwd := cfg.GetString("queue.passwd", "")
		dbnum := cfg.GetInt("queue.redisdb", 0)
		timeout := cfg.GetInt("queue.timeout", 3600)
		mqClient = RegisterRedisMqConsumerMust(RedisOption{
			Addr:    endpoints[0],
			Passwd:  passwd,
			DBnum:   dbnum,
			Timeout: timeout,
		}, PoolOption{
			5, 50, 5,
		}, groupName)
	default:
		panic("queue driver is not support")

	}

	mutex.Lock()
	defer mutex.Unlock()
	mqConsumerInstanceMap[groupName] = mqClient

	return mqClient, nil
}

func (m *MqMsg) BodyString() string {
	return string(m.Body)
}
